// Copyright 2014 The mqrouter Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package kafka

import (
	"context"
	"encoding/json"
	"fmt"
	"strings"
	"time"

	"gitee.com/xfrm/middleware/xmq/pub"

	"github.com/segmentio/kafka-go"

	"gitee.com/xfrm/middleware/xlog"
	"gitee.com/xfrm/middleware/xtime"
	"gitee.com/xfrm/middleware/xtrace"

	. "gitee.com/xfrm/middleware/internal/middlewareconf"
)

type KafkaHandler struct {
	msg    kafka.Message
	reader *kafka.Reader
}

type KafkaReader struct {
	*kafka.Reader
	stop chan struct{}
}

func NewKafkaHandler(reader *kafka.Reader, msg kafka.Message) *KafkaHandler {
	return &KafkaHandler{
		msg:    msg,
		reader: reader,
	}
}

func (m *KafkaHandler) CommitMsg(ctx context.Context) error {
	return m.reader.CommitMessages(ctx, m.msg)
}

func NewKafkaReader(brokers []string, topic, groupId string, partition, minBytes, maxBytes int, commitInterval time.Duration) *KafkaReader {
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:        brokers,
		Topic:          topic,
		GroupID:        groupId,
		Partition:      partition,
		MinBytes:       minBytes,
		MaxBytes:       maxBytes,
		CommitInterval: commitInterval,
		StartOffset:    kafka.LastOffset,
		//MaxWait:        30 * time.Second,
		Logger:      xlog.GetInfoLogger(),
		ErrorLogger: getErrorLogger(),
	})

	kafkaReader := &KafkaReader{
		Reader: reader,
		stop:   make(chan struct{}),
	}
	go kafkaReader.init()

	return kafkaReader
}

func NewReader(config *kafka.ReaderConfig) *KafkaReader {
	reader := kafka.NewReader(*config)
	return &KafkaReader{
		Reader: reader,
	}
}

func (m *KafkaReader) init() {
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			stats := m.Stats()
			statKafkaReader(m.Config().Topic, &stats)
		case <-m.stop:
			return
		}
	}
}

func (m *KafkaReader) setKafkaSpanTags(span xtrace.Span) {
	config := m.Config()
	span.SetTag(xtrace.TagComponent, pub.TraceComponent)
	span.SetTag(xtrace.TagPalfishMessageBusType, pub.TraceMessageBusTypeKafka)
	span.SetTag(xtrace.TagSpanKind, xtrace.SpanKindConsumer)
	span.SetTag(xtrace.TagMessageBusDestination, config.Topic)
	span.SetTag(xtrace.TagMessagingSystem, xtrace.MessagingSystemKafka)
	span.SetTag(xtrace.TagMessagingDestinationKind, xtrace.MessagingDestinationKindTopic)
	span.SetTag(xtrace.TagMessagingDestination, config.Topic)

	if len(config.Brokers) == 0 {
		span.SetTag(xtrace.TagPalfishKafkaConsumerBrokers, strings.Join(config.Brokers, ApolloMQBrokersSep))
		span.SetTag(xtrace.TagNetPeerIP, strings.Join(config.Brokers, ApolloMQBrokersSep))
	}

	if config.GroupID != "" {
		span.SetTag(xtrace.TagPalfishKafkaConsumerGroupID, config.GroupID)
		span.SetTag(xtrace.TagMessagingKafkaConsumerGroup, config.GroupID)
	}

	if config.Partition != 0 {
		span.SetTag(xtrace.TagPalfishKafkaConsumerPartition, config.Partition)
		span.SetTag(xtrace.TagMessagingKafkaPartition, config.Partition)
	}
}

func (m *KafkaReader) ReadMsg(ctx context.Context, v interface{}, ov interface{}) error {
	span := xtrace.SpanFromContext(ctx)
	if span != nil {
		m.setKafkaSpanTags(span)
	}

	st := xtime.NewTimeStat()
	msg, err := m.ReadMessage(ctx)
	pub.StatReqDuration(ctx, m.Config().Topic, "KafkaReader.ReadMessage", pub.TraceMessageBusTypeKafka, st.Millisecond())

	if err != nil {
		return err
	}

	err = json.Unmarshal(msg.Value, v)
	if err != nil {
		return err
	}

	err = json.Unmarshal(msg.Value, ov)
	if err != nil {
		return err
	}

	return nil
}

func (m *KafkaReader) FetchMsg(ctx context.Context, v interface{}, ov interface{}) (*KafkaHandler, error) {
	span := xtrace.SpanFromContext(ctx)
	if span != nil {
		m.setKafkaSpanTags(span)
	}

	st := xtime.NewTimeStat()
	msg, err := m.FetchMessage(ctx)
	pub.StatReqDuration(ctx, m.Config().Topic, "KafkaReader.FetchMessage", pub.TraceMessageBusTypeKafka, st.Millisecond())

	if err != nil {
		return nil, err
	}

	err = json.Unmarshal(msg.Value, v)
	if err != nil {
		return nil, err
	}

	err = json.Unmarshal(msg.Value, ov)
	if err != nil {
		return nil, err
	}

	return NewKafkaHandler(m.Reader, msg), nil
}

func (m *KafkaReader) Close() error {
	m.stop <- struct{}{}
	return m.Reader.Close()
}

func (m *KafkaReader) SetOffsetAt(ctx context.Context, t time.Time) error {
	return m.Reader.SetOffsetAt(ctx, t)
}

func (m *KafkaReader) SetOffset(ctx context.Context, offset int64) error {
	return m.Reader.SetOffset(offset)
}

//CommitInterval indicates the interval at which offsets are committed to
// the broker.  If 0, commits will be handled synchronously.
func NewGroupReader(ctx context.Context, topic, groupId string) (*KafkaReader, error) {
	middlewareConfig, err := GetMiddlewareConfig()
	if err != nil {
		return nil, err
	}
	config, err := middlewareConfig.GetMQConfig(ctx, topic, MiddlewareTypeKafka)
	if err != nil {
		return nil, err
	}

	mqType := config.MQType
	switch mqType {
	case MiddlewareTypeKafka:
		return NewKafkaReader(config.MQAddr, pub.WrapTopicFromContext(ctx, topic), groupId, 0, 1, 10e6, config.CommitInterval), nil

	default:
		return nil, fmt.Errorf("mqType %d error", mqType)
	}
}

func NewPartitionReader(ctx context.Context, topic string, partition int) (*KafkaReader, error) {
	middlewareConfig, err := GetMiddlewareConfig()
	if err != nil {
		return nil, err
	}
	config, err := middlewareConfig.GetMQConfig(ctx, topic, MiddlewareTypeKafka)
	if err != nil {
		return nil, err
	}

	offsetAt := config.OffsetAt
	mqType := config.MQType
	switch mqType {
	case MiddlewareTypeKafka:
		reader := NewKafkaReader(config.MQAddr, pub.WrapTopicFromContext(ctx, topic), "", partition, 1, 10e6, 0)
		if len(offsetAt) == 0 {
			return nil, fmt.Errorf("no offsetAt config found")
		}

		t, err := time.Parse("2006-01-02T15:04:05", offsetAt)
		if err != nil {
			return nil, err
		}

		err = reader.SetOffsetAt(ctx, t)
		if err != nil {
			return nil, err
		}

		return reader, err

	default:
		return nil, fmt.Errorf("mqType %d error", mqType)
	}
}
